package com.atguigu.realtime.function;

import com.alibaba.fastjson.JSONObject;
import com.atguigu.realtime.util.DimUtil;
import com.atguigu.realtime.util.JdbcUtil;
import com.atguigu.realtime.util.RedisUtil;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import redis.clients.jedis.Jedis;

import java.sql.Connection;


/**
 * @Author lzc
 * @Date 2023/3/20 14:08
 */
public abstract class DimMapFunction<T> extends RichMapFunction<T, T> {
    
    
    public abstract String getTable();
    public abstract String getId(T bean);
    public abstract void addDim(T bean, JSONObject dim);
    
    private Connection conn;
    private Jedis jedis;
    
    
    @Override
    public void open(Configuration parameters) throws Exception {
        conn = JdbcUtil.getPhoenixConnection();
    
        jedis = RedisUtil.getRedisClient();
    }
    
    @Override
    public void close() throws Exception {
        JdbcUtil.closeConnection(conn);
        
    }
    
    @Override
    public T map(T bean) throws Exception {
        // 返回一条维度数据: JSONObject
       // JSONObject dim = DimUtil.readDimFromPhoenix(conn, getTable(), getId(bean));
        JSONObject dim = DimUtil.readDim(jedis, conn, getTable(), getId(bean));
        
        
        // 补充维度(dim_sku_info): sku_name  spu_id  tm_id  c3_id
        // 补充维度(dim_spu_info): spu_name
        // ...
        addDim(bean, dim);  // 补充维度信息
        return bean;
    }
    
   
}
